package com.danan.data_loader.initializer;

import com.danan.data_loader.config.EnvironmentConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/13/11:03
 * @Description:
 */
@Configuration
public class EnvironmentInitializer {

    /**
     * @Description 获取编程环境
     * @Param []
     **/
    @Bean
    public StreamExecutionEnvironment env(@Autowired EnvironmentConfig environmentConfig){
        org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
        conf.setInteger("rest.port",environmentConfig.getPort());//设置web端口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        Integer parallelism = environmentConfig.getParallelism();
        if (parallelism != null) {
            env.setParallelism(parallelism);
        }
        // 2 设置状态后端和Checkpoint
//        env.setStateBackend(new EmbeddedRocksDBStateBackend());//启用RocksDB状态后端
        env.enableCheckpointing(60 * 1000);//设置checkpoint的周期
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//设置同时checkpoint的数量
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);//checkpoint的超时时间
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage(environmentConfig.getCheckpointStorage());//设置checkpoint的存储路径
        // 故障重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,30));

        return env;
    }

}
